package com.seaboxdata.threadHandler;

import com.alibaba.fastjson.JSONObject;
import com.seaboxdata.entity.HotData;
import com.seaboxdata.kafka.BaseStringKafkaConsumer;
import com.seaboxdata.kafka.BaseStringKafkaProducer;
import com.seaboxdata.service.HotDataService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
 * @author
 * @create 2020-11-10 10:47
 **/
@Component
@Slf4j
public class HotDataHandler implements DisposableBean, Runnable {

    private Thread thread;

    @Autowired
    private HotDataService hotDataService;

    @Autowired
    private BaseStringKafkaConsumer baseStringKafkaConsumer;

    @Value("${real.trans.hotDataTopic}")
    private String topic;

    @Value("${interface.isTest}")
    private String isTest;

    @Autowired
    private BaseStringKafkaProducer baseStringKafkaProducer;

    private boolean flag;

    @Autowired
    public void initThread() {
        if("true".equals(isTest)){
            return;
        }
        this.flag = true;
        this.thread = new Thread(this);
        thread.setName("hotDataThread");
        this.thread.start();
        log.info("hotDataThread start");
    }

    @Override
    public void run() {
        KafkaConsumer<String, String> consumer = baseStringKafkaConsumer.getConsumer(topic);
        while (flag) {
            boolean success = true;
            ConsumerRecords<String, String> messages = consumer.poll(Duration.ofMillis(1000L));
            List<HotData> results = new ArrayList<>();
            for (ConsumerRecord<String, String> message : messages) {
                results.add(JSONObject.parseObject(message.value(), HotData.class));
            }
            //写入数据库
            if (!CollectionUtils.isEmpty(results)) {
                try {
                    hotDataService.saveBatch(results);
                    consumer.commitSync();
                } catch (Exception e) {
                    log.error("hotData commit offset failed");
                    e.printStackTrace();
                    //success = false;
                }

                /*if (success) {
                    try {
                        hotDataService.saveBatch(results);
                    } catch (Exception e) {
                        log.error("batch insert hotData failed");
                        e.printStackTrace();
                        for (HotData hotData : results) {
                            try {
                                baseStringKafkaProducer.sendMessageToKafka(JSONObject.toJSONString(hotData), topic);
                            } catch (Exception e1) {
                                log.error("write database failed and write kafka hotData error");
                                e1.printStackTrace();
                            }
                        }
                    }
                }*/
            }
        }
    }

    @Override
    public void destroy() throws Exception {
        this.flag = false;
    }
}
